-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[fix][io] KCA sink: handle null values with KeyValue<Avro,Avro> schema #19861
Conversation
(cherry picked from commit f3747a1978c539c3f5a5f408988b9cc9125471a9)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lgtm
Nice catch
Looking at this a bit deeper: So the fix will hack this for the top level key value schema, but the issue may reappear if the schema is generated for something like
and the nested value is null. We may need to remove "if (kafkaSchema.isOptional())" check in KafkaConnectData.defaultOrThrow() |
actually in Pulsar a KeyValue schema allows null for both the key and the value. @dlg99 in Pulsar it is not allowed to nest KeyValue schemas. KeyValue schema is a (bad) trick to provide a Schema to the key of the message. any usage of nested KeyValue schemas is not supported (we may issue a warning or throw an error in Pulsar 3.0) and it cannot work properly so this schema doesn't work |
that's fine, nest any other pulsar schema. The problem is in the loss of optional flag in nested schema in translation form kafka schema to pulsar (source -> topic -> transform -> sink chain) |
...a-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
Show resolved
Hide resolved
@@ -80,6 +153,11 @@ private static org.apache.avro.Schema parseAvroSchema(String schemaJson) { | |||
return parser.parse(schemaJson); | |||
} | |||
|
|||
public static Schema getOptionalKafkaConnectSchema(org.apache.pulsar.client.api.Schema pulsarSchema) { | |||
Schema s = getKafkaConnectSchema(pulsarSchema); | |||
return new OptionalForcingSchema(s); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the only place it is used is within cache's get() call:
return schemaCache.get(pulsarSchema.getSchemaInfo().getSchema(), () -> {
if (pulsarSchema.getSchemaInfo().getType() == SchemaType.KEY_VALUE) {
KeyValueSchema kvSchema = (KeyValueSchema) pulsarSchema;
return SchemaBuilder.map(getKafkaConnectSchema(kvSchema.getKeySchema()),
getOptionalKafkaConnectSchema(kvSchema.getValueSchema()))
.build();
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
apache#19861) Co-authored-by: Andrey Yegorov <andrey.yegorov@datastax.com> (cherry picked from commit 03f8b80)
apache#19861) (#169) Co-authored-by: Andrey Yegorov <andrey.yegorov@datastax.com> (cherry picked from commit 03f8b80) Co-authored-by: Nicolò Boschi <boschi1997@gmail.com>
Motivation
When using a KCA based sink connector with a topic with schema KeyValue<Avro, Avro>, if a message has a null value, the connector throws:
Modifications
Handle null value conversion to the kafka value
Verifying this change
Documentation
doc
doc-required
doc-not-needed
doc-complete